{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# PySpark Quickstart ...\n",
    "by wangerqi@supermap.com,2016-03-21."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Spark的Python客户端\n",
    "由PySpark,SparkConf,SparkContext三个主要对象组成。pyspark支持本地、集群、混合三种模式运行。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark import SparkConf, SparkContext"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<module 'pyspark.status' from '/media/supermap/Application/OpenAI/anaconda3/envs/tensor/lib/python3.5/site-packages/pyspark/status.py'>"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pyspark.status"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "#help(pyspark)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 启动参数配置-SparkConf\n",
    "与命令行启动的参数是一样的效果。可以参考：http://my.oschina.net/u/2306127/blog/639414"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "<pyspark.conf.SparkConf object at 0x7ff58be51d68>\n"
     ]
    }
   ],
   "source": [
    "conf = (SparkConf()\n",
    "         .setMaster(\"local\")\n",
    "         .setAppName(\"MyApp\")\n",
    "         .set(\"spark.executor.memory\", \"1g\"))\n",
    "print(conf)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[('spark.master', 'local'), ('spark.app.name', 'MyApp'), ('spark.executor.memory', '1g'), ('spark.submit.deployMode', 'client')]\n"
     ]
    }
   ],
   "source": [
    "conf_kv = conf.getAll()\n",
    "print(conf_kv)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#help(conf)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 启动运行环境-SparkContext\n",
    "SparkContext是所有运行时的容器。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "<class 'pyspark.context.SparkContext'>\n",
      "<pyspark.context.SparkContext object at 0x7ff5ac9889b0>\n"
     ]
    }
   ],
   "source": [
    "if sc is None:\n",
    "    sc = SparkContext(conf = conf)\n",
    "    \n",
    "print(type(sc))\n",
    "print(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "#help(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "local-1458525444126\n"
     ]
    }
   ],
   "source": [
    "print(sc.applicationId)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 创建数据序列"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[0, 1, 2, 3, 4]"
      ]
     },
     "execution_count": 40,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sc.range(5).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[2, 3]"
      ]
     },
     "execution_count": 41,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sc.range(2, 4).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[1, 3, 5]"
      ]
     },
     "execution_count": 42,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sc.range(1, 7, 2).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 切分数据任务\n",
    "每一个节点拥有被切分数据的一个分区。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[[0], [2], [3], [4], [6]]"
      ]
     },
     "execution_count": 37,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[[], [0], [], [2], [4]]"
      ]
     },
     "execution_count": 39,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sc.parallelize(range(0, 6, 2), 5).glom().collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[0, 1, 4, 9, 16, 25]"
      ]
     },
     "execution_count": 43,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "myRDD = sc.parallelize(range(6), 3)\n",
    "sc.runJob(myRDD, lambda part: [x * x for x in part])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[0, 1, 16, 25]"
      ]
     },
     "execution_count": 44,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "myRDD = sc.parallelize(range(6), 3)\n",
    "sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 创建广播变量\n",
    "广播变量在所有节点上复制和同步。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[1, 2, 3, 4, 5]"
      ]
     },
     "execution_count": 45,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "b = sc.broadcast([1, 2, 3, 4, 5])\n",
    "b.value\n",
    "#[1, 2, 3, 4, 5]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]"
      ]
     },
     "execution_count": 48,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()\n",
    "#[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]\n",
    "#b.unpersist()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 停止Spark运行环境"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sc.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
